-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
streamingccl: avoid passing evalCtx
, txn
as parameters to ingestion & replication funcs
#90964
Conversation
StreamIngestionManager
and ReplicationStreamManager
under sql.plannerStreamIngestionManager
and ReplicationStreamManager
to sql.planner
This still has a cycle. Define the interfaces in question in package streaming
type StreamID = streamingpb.StreamID Also, rather than embedding them in |
Not sure if I understood it correctly, but isn't this making |
Moving the interface definitions to What does "weight" mean to you in this context?
:) No, eval cannot import sql. The idea here is that we need to refactor One way I think about it is that the |
I'm realizing it may not be clear exactly what I'm suggesting. The implementations of these interfaces ought to be constructed from |
8dd7c76
to
f5e462a
Compare
I meant the number of imported pkgs. But it was because I thought you meant having the implementation in eval too, so nvm.
Hmm I can't find anything like I now created a |
pkg/sql/streaming.go
Outdated
// Copyright 2022 The Cockroach Authors. | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised to find all this new logic here. Is it just copied from streamingest
? This seems weird to me. Can you keep it there and just call that code from the sql package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to use those in streamingest
and streamproducer
, but directly calling them from sql
caused this loop
.-> //pkg/sql:sql (d5981f1a3d4a46828dfe84289c55e9bfde67ac814b3f9fd2987adc5d9089163d)
| //pkg/ccl/streamingccl/streamproducer:streamproducer (d5981f1a3d4a46828dfe84289c55e9bfde67ac814b3f9fd2987adc5d9089163d)
`-- //pkg/sql:sql (d5981f1a3d4a46828dfe84289c55e9bfde67ac814b3f9fd2987adc5d9089163d)
I made the 3rd commit to break sql
's dependency on streamproducer
with injection. Not very sure if it's correct though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here's an ugly part -- i passed different interfaces for planner as a parameter for these functions -- e.g. for CompleteStreamIngestion()
it's sql.JobExecContext
, for StartReplicationStream()
it's sql.PlanHookState
. It's because I had to adapt to the different methods of *sql.planner
called within them.
f5e462a
to
9ce2d85
Compare
I feel like this got complicated. Consider the following: StreamManagerFactory StreamManagerFactory
}
type StreamManagerFactory interface {
GetReplicationStreamManager() ReplicationStreamManager
GetStreamIngestManager() StreamIngestManager
}
// ReplicationStreamManager represents a collection of APIs that streaming replication supports
// on the production side.
type ReplicationStreamManager interface {
// StartReplicationStream starts a stream replication job for the specified tenant on the producer side.
StartReplicationStream(
ctx context.Context,
tenantID uint64,
) (streampb.StreamID, error)
// HeartbeatReplicationStream sends a heartbeat to the replication stream producer, indicating
// consumer has consumed until the given 'frontier' timestamp. This updates the producer job
// progress and extends its life, and the new producer progress will be returned.
// If 'frontier' is hlc.MaxTimestamp, returns the producer progress without updating it.
HeartbeatReplicationStream(
ctx context.Context,
streamID streampb.StreamID,
frontier hlc.Timestamp,
) (streampb.StreamReplicationStatus, error)
// StreamPartition starts streaming replication on the producer side for the partition specified
// by opaqueSpec which contains serialized streampb.StreamPartitionSpec protocol message and
// returns a value generator which yields events for the specified partition.
StreamPartition(
streamID streampb.StreamID,
opaqueSpec []byte,
) (ValueGenerator, error)
// GetReplicationStreamSpec gets a stream replication spec on the producer side.
GetReplicationStreamSpec(
ctx context.Context,
streamID streampb.StreamID,
) (*streampb.ReplicationStreamSpec, error)
// CompleteReplicationStream completes a replication stream job on the producer side.
// 'successfulIngestion' indicates whether the stream ingestion finished successfully and
// determines the fate of the producer job, succeeded or canceled.
CompleteReplicationStream(
ctx context.Context,
streamID streampb.StreamID,
successfulIngestion bool,
) error
}
// StreamIngestManager represents a collection of APIs that streaming replication supports
// on the ingestion side.
type StreamIngestManager interface {
// CompleteStreamIngestion signals a running stream ingestion job to complete on the consumer side.
CompleteStreamIngestion(
ctx context.Context,
ingestionJobID jobspb.JobID,
cutoverTimestamp hlc.Timestamp,
) error
// GetStreamIngestionStats gets a statistics summary for a stream ingestion job.
GetStreamIngestionStats(
ctx context.Context,
ingestionJobID jobspb.JobID,
) (*streampb.StreamIngestionStats, error)
} package streaming
// StreamID forwards the definition os streampb.StreamID.
type StreamID = streampb.StreamID
// InvalidStreamID is the zero value for StreamID corresponding to no stream.
const InvalidStreamID StreamID = 0
// GetReplicationStreamManagerHook is the hook to get access to the producer side replication APIs.
// Used by builtin functions to trigger streaming replication.
var GetReplicationStreamManagerHook func(ctx context.Context, evalCtx *eval.Context, tx *kv.Txn) (eval.ReplicationStreamManager, error)
// GetStreamIngestManagerHook is the hook to get access to the ingestion side replication APIs.
// Used by builtin functions to trigger streaming replication.
var GetStreamIngestManagerHook func(ctx context.Context, evalCtx *eval.Context, txn *kv.Txn) (eval.StreamIngestManager, error)
// GetReplicationStreamManager returns a ReplicationStreamManager if a CCL binary is loaded.
func GetReplicationStreamManager(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn,
) (eval.ReplicationStreamManager, error) {
if GetReplicationStreamManagerHook == nil {
return nil, errors.New("replication streaming requires a CCL binary")
}
return GetReplicationStreamManagerHook(ctx, evalCtx, txn)
}
// GetStreamIngestManager returns a StreamIngestManager if a CCL binary is loaded.
func GetStreamIngestManager(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn,
) (eval.StreamIngestManager, error) {
if GetReplicationStreamManagerHook == nil {
return nil, errors.New("replication streaming requires a CCL binary")
}
return GetStreamIngestManagerHook(ctx, evalCtx, txn)
} Then implement just |
I'm confused -- do we still want to define those methods in // GetReplicationStreamManagerHook is the hook to get access to the producer side replication APIs.
// Used by builtin functions to trigger streaming replication.
var GetReplicationStreamManagerHook func(ctx context.Context, evalCtx *eval.Context, tx *kv.Txn) (eval.ReplicationStreamManager, error)
// GetStreamIngestManagerHook is the hook to get access to the ingestion side replication APIs.
// Used by builtin functions to trigger streaming replication.
var GetStreamIngestManagerHook func(ctx context.Context, evalCtx *eval.Context, txn *kv.Txn) (eval.StreamIngestManager, error) I thought we'd like to deprecate sending an eval context to functions in |
I don't care about the |
bc5f111
to
0eb68d2
Compare
|
0eb68d2
to
eb4c867
Compare
…ager to eval This commit: 1. moves the definition of StreamIngestionManager and ReplicationStreamManager to eval; 2. has planner implements functions in StreamIngestionManager and ReplicationStreamManager, so that they won't take eval.Context and evalCtx.Txn as parameters. Release note: None
eb4c867
to
431b1f4
Compare
This PR is ready for another look. Note that I haven't covered changes with the internal executor yet, as I'm leaning towards having another PR to deprecate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevendanna can you sign off on this/signal your awareness?
@ZhouXing19, please update the PR title before merging.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @rhu713)
StreamIngestionManager
and ReplicationStreamManager
to sql.plannerevalCtx
, txn
as parameters to ingestion & replication funcs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ZhouXing19, please update the PR title before merging.
Done
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner and @rhu713)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the ping. Overall this seems like a good thing.
One question I have is whether moving StreamID to streampb was necessary to resolve a dependency conflict. If not, it is the only bit of this I might reconsider just so we don't have CCL packages being imported in non-test code pkg/sql.
We definitely don't want to use the one in the |
I don't really get this in that the
I couldn't care less about the direct import vs indirect. |
If you did want to move it, I think |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having #91005 open works for me.
Thanks for this cleanup!
Thanks all for reviewing! |
Build succeeded: |
This PR is part of the effort to eliminate usages of
eval.Context.Txn
.It moves
StreamIngestionManager
andReplicationStreamManager
under eval;StreamIngestionManager
andReplicationStreamManager
viasql.planner
.The core changes are
so that the functions under these 2 interfaces run upon
eval.Context
andkv.Txn
from thesql.planner
.Follow-up:
register.ex
.informs #90923